Apache Spark MLlib

MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. At a high level, it provides tools such as:

  • ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
  • Featurization: feature extraction, transformation, dimensionality reduction, and selection
  • Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
  • Persistence: saving and load algorithms, models, and Pipelines
  • Utilities: linear algebra, statistics, data handling, etc.

You can find 2 Machine Libraries APIs in Spark:

* spark.mllib: it is the RDD-based API
* spark.ml: it is the DataFrame-based API

spark.ml is the primary ML library in Spark. spark.mllib is in maintenance mode. This means that it can be used and it will have bug fixes but will not have any new features.

As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml package.

DataFrames provide a more user-friendly API than RDDs. The many benefits of DataFrames include Spark Datasources, SQL/DataFrame queries, Tungsten and Catalyst optimizations, and uniform APIs across languages.


In [1]:
from pyspark.sql import SparkSession
import pyspark 

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext

Data Used

We will use different data sets:

  • Papers Dataset:

This is a dataset containing conferences, authors and paper titles


In [2]:
df = spark.read.csv(path = '../data/papers.csv', header = False,inferSchema = True)

In [4]:
df.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)


In [6]:
df.show(5)


+---+----+---+--------+--------------------+
|_c0| _c1|_c2|     _c3|                 _c4|
+---+----+---+--------+--------------------+
|  0|1996|  3|3CONLINE|Shene, A Comparat...|
|  1|1994|  1|3CONLINE|Janik, Textbooks ...|
|  2|1999|  4|THESTATE|Fink & Mao, The 8...|
|  3|2000| 17|    AAAI|Achlioptas & Gome...|
|  4|2000| 17|    AAAI|Bejar & Manya, So...|
+---+----+---+--------+--------------------+
only showing top 5 rows

  • Black Friday Dataset

source: https://datahack.analyticsvidhya.com/contest/black-friday

The data set also contains customer demographics (age, gender, marital status, city_type, stay_in_current_city), product details (product_id and product category) and Total purchase_amount from last month.

Variable Definition
User_ID User ID
Product_ID Product ID
Gender Sex of User
Age Age in bins
Occupation Occupation (Masked)
City_Category Category of the City (A,B,C)
Stay_In_Current_City_Years Number of years stay in current city
Marital_Status Marital Status
Product_Category_1 Product Category (Masked)
Product_Category_2 Product may belongs to other category also (Masked)
Product_Category_3 Product may belongs to other category also (Masked)
Purchase Purchase Amount (Target Variable)

In [7]:
bf_train = spark.read.csv(path = '../data/blackfriday_train.csv', header = True,inferSchema = True)
bf_test = spark.read.csv(path = '../data/blackfriday_test.csv', header = True,inferSchema = True)

In [8]:
bf_train.printSchema()


root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)

DataFrames Manipulations

source: https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

How to see datatype of columns?

To see the types of columns in DataFrame, we can use the printSchema. printSchema() on a DataFrame will show the schema in a tree format.


In [9]:
df.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)

How to Show first n observation?

We can use head operation to see first n observation (say, 5 observation). Head operation in PySpark is returns a list of Rows.


In [10]:
df.head(5)


Out[10]:
[Row(_c0=0, _c1=1996, _c2='3', _c3='3CONLINE', _c4='Shene, A Comparative Study of Linked List Sorting Algorithms'),
 Row(_c0=1, _c1=1994, _c2='1', _c3='3CONLINE', _c4='Janik, Textbooks for Teaching C++'),
 Row(_c0=2, _c1=1999, _c2='4', _c3='THESTATE', _c4='Fink & Mao, The 85 Ways to Tie a Tie:  The Science and Aesthetics of Tie Knots'),
 Row(_c0=3, _c1=2000, _c2='17', _c3='AAAI', _c4='Achlioptas & Gomes & Kautz & Selman, Generating Satisfiable Problem Instances'),
 Row(_c0=4, _c1=2000, _c2='17', _c3='AAAI', _c4='Bejar & Manya, Solving the Round Robin Problem Using Propositional Logic')]

To see the result formatted as DataFrames output, we can use the show operation.

We can pass the argument truncate = True to truncate the result (the row won't be shown completely).

Note that show does not return any data, just shows the DataFrame contents.


In [11]:
print(type(df.show(2,truncate= True)))

#df.show(2,truncate= True)


+---+----+---+--------+--------------------+
|_c0| _c1|_c2|     _c3|                 _c4|
+---+----+---+--------+--------------------+
|  0|1996|  3|3CONLINE|Shene, A Comparat...|
|  1|1994|  1|3CONLINE|Janik, Textbooks ...|
+---+----+---+--------+--------------------+
only showing top 2 rows

<class 'NoneType'>

How to Count the number of rows in DataFrame?

We can use count operation to count the number of rows in DataFrame.


In [12]:
df.count()


Out[12]:
123315

How many columns do we have in train and test files along with their names?

For getting the columns name we can use columns on DataFrame, similar to what we do for getting the columns in pandas DataFrame.


In [13]:
len(df.columns), df.columns


Out[13]:
(5, ['_c0', '_c1', '_c2', '_c3', '_c4'])

How to get the summary statistics (mean, standard deviance, min ,max , count) of numerical columns in a DataFrame?

describe operation is use to calculate the summary statistics of numerical column(s) in DataFrame. If we don’t specify the name of columns it will calculate summary statistics for all numerical columns present in DataFrame.


In [14]:
df.describe().show()


+-------+-----------------+------------------+------------------+--------+--------------------+
|summary|              _c0|               _c1|               _c2|     _c3|                 _c4|
+-------+-----------------+------------------+------------------+--------+--------------------+
|  count|           123315|            123315|            123315|  123315|              123314|
|   mean|          61657.0|1995.1891578477882| 90.64012293936854|    null|                null|
| stddev|35598.11855983403| 25.75542351380241|344.61551232986926|    null|                null|
|    min|                0|               196|                 1|3CONLINE|"Baker, ""Natural...|
|    max|           123314|              2008|                 Z|       d|withheld, FREE Ha...|
+-------+-----------------+------------------+------------------+--------+--------------------+

Note that describe() is a transformation, so the result is a DataFrame that we can collect or transform again.


In [16]:
df.describe()


Out[16]:
DataFrame[summary: string, _c0: string, _c1: string, _c2: string, _c3: string, _c4: string]

In [17]:
type(df.describe())


Out[17]:
pyspark.sql.dataframe.DataFrame

As we can see that, describe operation is working for String type column but the output for mean, stddev are null and min & max values are calculated based on ASCII value of categories.


In [ ]:
df.describe("_c0").show()

How to find the number of distinct product in train and test files?

The distinct operation can be used here, to calculate the number of distinct rows in a DataFrame. Let’s apply distinct operation to calculate the number of distinct product in df.


In [18]:
unique_elements = df.select('_c0').distinct().collect()
len(unique_elements)


Out[18]:
123315

In [ ]:
df.select('_c0').distinct().count()

In [23]:
# if we want to get the different number of conferences
df.select("_c3").distinct().show(5)


+---------+
|      _c3|
+---------+
|AJPHYSICS|
|   ICVLSI|
|  NTMATHU|
|  ATHCOMP|
|  NOTICES|
+---------+
only showing top 5 rows

What if I want to calculate pair wise frequency of categorical columns?

We can use crosstab operation on DataFrame to calculate the pair wise frequency of columns. Let’s apply crosstab operation on Age[years] and Sex columns of df DataFrame.


In [24]:
df2 = spark.read.csv(path = '../data/people.csv', header = True,inferSchema = True)

In [25]:
df2.crosstab('Age[years]', 'Sex').show()


+--------------+-------+-----+
|Age[years]_Sex| female| male|
+--------------+-------+-----+
|          42.0|      1|    0|
|          37.0|      0|    1|
|          29.0|      0|    1|
|          61.0|      1|    0|
|          33.0|      0|    1|
|          77.0|      1|    0|
|          32.0|      1|    0|
|          45.0|      0|    1|
|          18.0|      0|    1|
|          19.0|      0|    1|
+--------------+-------+-----+

In the above output, the first column of each row will be the distinct values of Age and the column names will be the distinct values of Sex. The name of the first column will be Age[years]_Sex. Pair with no occurrences will have zero count in contingency table.

What If I want to get the DataFrame which won’t have duplicate rows of given DataFrame?

We can use dropDuplicates operation to drop the duplicate rows of a DataFrame and get the DataFrame which won’t have duplicate rows.

If we apply this on two columns Sex and Eye Color of df2 and get the all unique rows for these columns.


In [27]:
df2.select('Sex', 'Eye Color').show()


+-------+---------+
|    Sex|Eye Color|
+-------+---------+
| female|    brown|
|   male|    green|
|   male|     blue|
| female|     blue|
| female|     gray|
|   male|    green|
| female|    brown|
|   male|    brown|
|   male|    green|
|   male|     gray|
+-------+---------+


In [26]:
df2.select('Sex', 'Eye Color').dropDuplicates().show()


+-------+---------+
|    Sex|Eye Color|
+-------+---------+
|   male|    brown|
| female|     gray|
|   male|     blue|
| female|     blue|
|   male|    green|
| female|    brown|
|   male|     gray|
+-------+---------+


In [28]:
df2.select('Sex', 'Eye Color').count()


Out[28]:
10

In [29]:
df2.select('Sex', 'Eye Color').dropDuplicates().count()


Out[29]:
7

What if I want to drop the all rows with null value?

The dropna operation can be use here. To drop row from the DataFrame it consider three options.

  • howany or all. If any, drop a row if it contains any nulls. If all, drop a row only if all its values are null.
  • threshint, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
  • subset – optional list of column names to consider.

Let’t drop null rows in df2 with default parameters and count the rows in output DataFrame. Default options are any, None, None for how, thresh, subset respectively.


In [30]:
bf_train.count()


Out[30]:
550068

In [31]:
bf_train.dropna().count()


Out[31]:
166821

What if I want to fill the null values in DataFrame with constant number?

Use fillna operation here. The fillna will take two parameters to fill the null values.

  • value:
    • It will take a dictionary to specify which column will replace with which value.
    • A value (int , float, string) for all columns.
  • subset: Specify some selected columns.

Let’s fill -1 inplace of null values in train DataFrame.


In [ ]:
bf_train.show(2)

In [ ]:
bf_train.fillna(-1).show(2)

If I want to filter the rows in train which has Purchase more than 15000?

We can apply the filter operation on Purchase column in bf_train DataFrame to filter out the rows with values more than 15000.

We need to pass a condition.

Let’s apply filter on Purchase column in train DataFrame and print the number of rows which has more purchase than 15000.


In [ ]:
bf_train.filter(bf_train.Purchase > 15000).count()

How to find the mean of each age group in train?

The groupby operation can be used here to find the mean of Purchase for each age group in bf_train. Let’s see how can we get the mean purchase for the Age column train.


In [ ]:
bf_train.groupby('Age').agg({'Purchase': 'mean'}).show()

We can also apply sum, min, max, count with groupby when we want to get different summary insight each group.

Let’s take one more example of groupby to count the number of rows in each Age group.


In [ ]:
bf_train.groupby('Age').count().show()

How to create a sample DataFrame from the base DataFrame?

We can use sample operation to take sample of a DataFrame.

The sample method on DataFrame will return a DataFrame containing the sample of base DataFrame. The sample method will take 3 parameters.

  • withReplacement = True or False to select a observation with or without replacement.
  • fraction = x, where x = .5 shows that we want to have 50% data in sample DataFrame.
  • seed to reproduce the result

Let’s create the two DataFrame t1 and t2 from bf_train, both will have 20% sample of train and count the number of rows in each.


In [ ]:
t1 = bf_train.sample(False, 0.2, 42)
t2 = bf_train.sample(False, 0.2, 43)
t1.count(),t2.count()

How to apply map operation on DataFrame columns?

We can apply a function on each row of DataFrame using map operation. After applying this function, we get the result in the form of RDD. Let’s apply a map operation on User_ID column of bf_train and print the first 5 elements of mapped RDD(x,1) after applying the function.


In [ ]:
bf_train.select('User_ID').rdd.map(lambda x:(x,1)).take(5)

How to sort the DataFrame based on column(s)?

We can use orderBy operation on DataFrame to get sorted output based on some column. The orderBy operation take two arguments.

  • List of columns.
  • ascending = True or False for getting the results in ascending or descending order(list in case of more than two columns )

Let’s sort the train DataFrame based on Purchase.


In [ ]:
bf_train.orderBy(bf_train.Purchase.desc()).show(5)

How to add the new column in DataFrame?

We can use withColumn operation to add new column (we can also replace) in base DataFrame and return a new DataFrame. The withColumn operation will take 2 parameters.

  • Column name which we want add /replace.
  • Expression on column.

Let’s see how withColumn works. To calculate new column name Purchase_new in bf_train which is calculated by dviding Purchase column by 2.


In [ ]:
bf_train.withColumn('Purchase_new', bf_train.Purchase /2.0).select('Purchase','Purchase_new').show(5)

We can also use functions with withColumn


In [34]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

to_cat = udf(lambda x: "cheap" if x > 15000 else "expensive", StringType())
bf_train.withColumn('Purchase_cat', to_cat(bf_train["Purchase"])).select('Purchase_cat').show(5)


+------------+
|Purchase_cat|
+------------+
|   expensive|
|       cheap|
|   expensive|
|   expensive|
|   expensive|
+------------+
only showing top 5 rows

How to drop a column in DataFrame?

To drop a column from the DataFrame we can use drop operation. Let’s drop the column called Comb from the bf_test and get the remaining columns in bf_test.


In [35]:
bf_test.drop('Comb').columns


Out[35]:
['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3']

How to split a DataFrame into two new DataFrames

Sometime we will want to have a DataFrame divided into separate parts separated randomly.


In [36]:
train, test = df.randomSplit([0.9, 0.1], seed=12345)

In [37]:
train.count()


Out[37]:
110873

In [38]:
test.count()


Out[38]:
12442

Algebraic Structures

Dense and Sparse Vectors

  • A vector is a one-dimensional array of elements.

    The natural Python implementation of a vector is as a one-dimensional list. However, in many applications, the elements of a vector have mostly zero values. Such a vector is said to be sparse.

    It is inefficient to use a one-dimensional list to store a sparse vector. It is also inefficient to add elements whose values are zero in forming sums of sparse vectors. Consequently, we should choose a different representation.

  • A dense vector is the most natural implementation, using one-dimensional list

A sparse vector is represented by two parallel arrays: indices and values. Zero entries are not stored. A dense vector is backed by a double array representing its entries. For example, a vector [1., 0., 0., 0., 0., 0., 0., 3.] can be represented in the sparse format as (7, [0, 6], [1., 3.]), where 7 is the size of the vector, as illustrated below:

(source: https://databricks.com/blog/2014/07/16/new-features-in-mllib-in-spark-1-0.html)


In [39]:
from pyspark.ml.linalg import SparseVector, DenseVector, Matrices

In [40]:
sv1 = SparseVector(3, [0, 2], [1.0, 3.0])
sv1


Out[40]:
SparseVector(3, {0: 1.0, 2: 3.0})

In [41]:
dv1 = DenseVector([1.0, 3.0])
dv1


Out[41]:
DenseVector([1.0, 3.0])

We can also have Sparse and Dense Matrices


In [ ]:
Matrices.dense(2,2,[1,2,3,4])

In [ ]:
sparse_mat = Matrices.sparse(2,2,[1,2,3],[0,1],[1,1])

In [ ]:
dense_mat = Matrices.dense(2,2,[1,2,3,4])

It is common to, instead of represent features as variables, to represent all instance variables as a vector, which indeed can be sparse.


In [ ]:
sparse_df = sc.parallelize([
  (1, SparseVector(10, {1: 1.0, 2: 1.0, 3: 2.0, 4: 1.0, 5: 3.0})),
  (2, SparseVector(10, {9: 100.0})),
  (3, SparseVector(10, {1: 1.0})),
]).toDF(["row_num", "features"])
sparse_df.show()

In [ ]:
dense_df = sc.parallelize([
  (1, DenseVector([1,2,3,4])),
  (2, DenseVector([1,2,3,4])),
  (3, DenseVector([1,3,4,5])),
]).toDF(["row_num", "features"])
dense_df.show()

When to Exploit Sparsity

from: https://databricks.com/blog/2014/07/16/new-features-in-mllib-in-spark-1-0.html

For many large-scale datasets, it is not feasible to store the data in a dense format. Nevertheless, for medium-sized data, it is natural to ask when we should switch from a dense format to sparse. In MLlib, a sparse vector requires 12nnz+4 bytes of storage, where nnz is the number of nonzeros, while a dense vector needs 8n bytes, where n is the vector size. So storage-wise, the sparse format is better than the dense format when more than 1/3 of the elements are zero. However, assuming that the data can be fit into memory in both formats, we usually need sparser data to observe a speedup, because the sparse format is not as efficient as the dense format in computation. Our experience suggests a sparsity of around 10%, while the exact switching point for the running time is indeed problem-dependent.

Pipelines

from: https://spark.apache.org/docs/2.3.2/ml-pipeline.html

In this section, we introduce and practice with the concept of ML Pipelines.

ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.

MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. This section covers the key concepts introduced by the Pipelines API, where the pipeline concept is mostly inspired by the scikit-learn project.

  • DataFrame: This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.

  • Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.

  • Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.

  • Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

Transformers

A Transformer is an abstraction that includes feature transformers and learned models.

Technically, a Transformer implements a method transform(), which converts one DataFrame into another, generally by appending one or more columns.

For example:

A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.

A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.

Estimators

An Estimator abstracts the concept of a learning algorithm or any algorithm that fits or trains on data. Technically, an Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer.

For example, a learning algorithm such as LogisticRegression is an Estimator, and calling fit() trains a LogisticRegressionModel, which is a Model and hence a Transformer.

Code examples

We will see two examples of how to train Supervised Machine Learning models. Supervised Models try learn from labeled datasets. This means that we have a dataset with some variables for each ocurrence and a label of that occurrence.

The main objective is to predict a label for a new occurrence for which we don't have the label.

Regression: Predicting a Continuous Variable

In the following example we will load the auto-mpg.csv dataset. The description says (http://archive.ics.uci.edu/ml/datasets/Auto+MPG)

"The data concerns city-cycle fuel consumption in miles per gallon, to be predicted in terms of 3 multivalued discrete and 5 continuous attributes." (Quinlan, 1993)

So, first of all, let's load the dataset:


In [42]:
auto_df = spark.read.csv(path = '../data/auto-mpg.csv', 
                         header = True, 
                         inferSchema = True)

In [43]:
auto_df.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- mpg: double (nullable = true)
 |-- cylinders: integer (nullable = true)
 |-- displacement: double (nullable = true)
 |-- horsepower: string (nullable = true)
 |-- weight: double (nullable = true)
 |-- acceleration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- origin: integer (nullable = true)
 |-- name: string (nullable = true)

Our main goal is to build a predictive model that takes some input variables representing the vehicle features, and outputs the consumption of the vehicle in miles per gallon.


In [44]:
pred_vars = ['cylinders', 'displacement', 'weight', 'acceleration', 'year', 'origin']

To predict the consumption we will use a Linear Regression model. We won't go into details about the model itself, but we have to take into account the following considerations:

  • The model will take as input a Vector representing the vehicle characteristics
  • All input variables must be numeric variables
  • The Linear Regressor model is an Estimator
  • A Linear Regressor is a Supervised Machine Learning algorithm
  • The model obtained is a Transformer

To generate the training dataset we will use a VectorAssembler, which is a transformer.

VectorAssembler, takes a DataFrame as input, and outputs the same DataFrame with the specified columns in a vector.


In [45]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(
    inputCols = pred_vars, 
    outputCol = 'features')
train_df = vectorAssembler.transform(auto_df)

train_df = train_df.withColumn("label", auto_df["mpg"])

train_df = train_df.select(['features', 'label'])
train_df.show(3)


+--------------------+-----+
|            features|label|
+--------------------+-----+
|[8.0,307.0,3504.0...| 18.0|
|[8.0,350.0,3693.0...| 15.0|
|[8.0,318.0,3436.0...| 18.0|
+--------------------+-----+
only showing top 3 rows

Then we create a LinearRegression Estimator (skip parameters details). Remember that lrModel is a Transformer.


In [47]:
from pyspark.ml.regression import LinearRegression

# LinearRegression is an Estimator
lr = LinearRegression(maxIter=10, 
                      regParam=0.3, 
                      elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(train_df)
# lrModel will contain a Transformer
type(lrModel)


Out[47]:
pyspark.ml.regression.LinearRegressionModel

To make the predictions over the dataset, we just have to apply the transformer over the features of a certain dataset.


In [48]:
predictions = lrModel.transform(train_df.select(['features']))

predictions.show(5)


+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[8.0,307.0,3504.0...|15.516094979306784|
|[8.0,350.0,3693.0...| 14.40174603470855|
|[8.0,318.0,3436.0...|15.891039618272371|
|[8.0,304.0,3433.0...|15.930576188946931|
|[8.0,302.0,3449.0...|15.804137052500417|
+--------------------+------------------+
only showing top 5 rows

Classification: Learning to predict text classes

In this second example we will see how to classify text using a Logistic Regression model. The aim of this example is to learn how to concatenate some Transformers in a Pipeline.

We will use two transformers and an estimator:

  • Tokenizer: it is a Transformer that takes a textual column as input and generates a vector of tokenized words

In [ ]:
df = spark.createDataFrame([
    (0,"the cat in the mat is flat"),
    (1,"the mouse with the hat is nice")
], ["id","text"])

tokenizer = Tokenizer(inputCol="text", outputCol="words")

tok_df = tokenizer.transform(df)

tok_df.select("words").collect()
  • CountVectorizer: Convert a list of words into a vector of variables. It does so by converting each word into an index and then at each position (which represents the word in the vector) counts the occurrences of the word in the original list.

In [ ]:
from pyspark.ml.feature import CountVectorizer

count_vec = CountVectorizer(inputCol="words", outputCol="features")

counter = count_vec.fit(tok_df)

count_df = counter.transform(tok_df)
count_df.show()

In [ ]:
counter.vocabulary
  • LogisticRegression: takes some input variables with a label and constructs a classification model
  • Pipeline: puts together transformers and estimators

In [ ]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = CountVectorizer(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

Model Evaluation

Train/Test separation

The most straightforward approach to evaluate a supervised model is to split the original dataset into two subsets.

  • Training subset: this set is used to train a model. Normally, supervised Machine Learning algorithms try to minimize an error value, so algorithms use features and labels from the training dataset to learn a model that minimizes this error.

    However, if we expose the algorithm to too much learning we may see that the algorithm begins to memorize occurrences in the training dataset. This is the so called overfitting problem, this problem derives to poor generalization. At the end of the day we have a model that performs very well over the training data, but does a bad prediction into new occurrences.

  • Test subset: if we separate a group of occurrences that are hidden to the algorithm during training, we can use these occurrences to check how is our model behaving to new occurrences. We can do so because the test subset is a proper labelled set of data, so we can check the difference between the actual label and the predicted label. With this difference we can generate an analysis to see the goodness of the model.

Evaluation of a Regression Model

To evaluate a Regression model we can use the following metrics:

  • Mean Absolute Error (MAE) is the mean of the absolute value of the errors:
$$ \frac{1}{n} \sum_{i=1}^{n} |y_{i}-\hat{y}_{i}| $$
  • Root Mean Squared Error (RMSE) is the square root of the mean of the squared errors:
$$ \frac{1}{n} \sum_{i=1}^{n} \sqrt{(y_{i}-\hat{y}_{i})^2} $$

In [ ]:
auto_df = spark.read.csv(path = '../data/auto-mpg.csv', 
                         header = True, 
                         inferSchema = True)

pred_vars = ['cylinders', 'displacement', 'weight', 'acceleration', 'year', 'origin']

vectorAssembler = VectorAssembler(
    inputCols = pred_vars, 
    outputCol = 'features')

vec_auto_df = vectorAssembler.transform(auto_df)

vec_auto_df = vec_auto_df.withColumn("label", auto_df["mpg"])

vec_auto_df = vec_auto_df.select(['features', 'label'])

train_auto_df, test_auto_df = vec_auto_df.randomSplit([0.9, 0.1], seed=12345)

In [ ]:
lr = LinearRegression(maxIter=10, 
                      regParam=0.3, 
                      elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(train_auto_df)

In [ ]:
predicted_auto_df = lrModel.transform(test_auto_df)

In [ ]:
from pyspark.ml.evaluation import RegressionEvaluator

lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="label", metricName="mae")

results = lr_evaluator.evaluate(predicted_auto_df)

print("R Squared (R2) on test data = %g" % results)

Evaluation of a Classification Model

We will focus on the classification of a binary model. This means that the classification model classifies between two exclusive variables: "a" and "b" for example.

A binary classifier can be seen as a classifier telling if the occurrence belongs to one of the classes, say "a". So, when the classifier outputs a true value, it means that the occurrence belongs to class a. If the classifier outputs a false value, it means that it does not belong to class "a", hence it belong to class "b". So, false outputs mean that the occurrence belongs to class "b".

source: https://en.wikipedia.org/wiki/Precision_and_recall

When we classify a new occurrence, we can have the following 4 cases:

source: https://en.wikipedia.org/wiki/Precision_and_recall

  • True Positive: The prediction and the actual value are the same, a positive value
  • True Negative: The prediction and the actual value are the same, a negative value
  • False Positive: The prediction and the actual value differ, the actual value is negative but the predicted value is positive
  • False Negative: The prediction and the actual value differ, the actual value is positive but the predicted value is negative

Taking these definitions into account we can define the following metrics:

  • Accuracy: among all the sample, how many are correct $$ acc = \frac{TP+TN}{TP+TN+FP+FN}$$
  • Precision: for those for which the model said as positive, how many of them are correct $$ prec = \frac{TP}{TP+FP} $$
  • Recall: for those which are actually real, how many of them my model can label correctly $$ rec = \frac{TP}{TP+FN} $$
  • F1 measure: $$ F = 2 \cdot \frac{prec \cdot acc}{prec + acc} $$

In [ ]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

iris_df = spark.read.csv(path = '../data/iris.data', 
                         header = False, 
                         inferSchema = True)

iris_df.printSchema()

setosa_udf = udf(lambda x: "a" if x == "Iris-setosa" else "b", StringType())
iris_df = iris_df.withColumn("_c5", setosa_udf(iris_df["_c4"]))

pred_vars = ['_c0', '_c1', '_c2', '_c3']

vectorAssembler = VectorAssembler(
    inputCols = pred_vars, 
    outputCol = 'features')
vec_iris_df = vectorAssembler.transform(iris_df)

indexer = StringIndexer(inputCol="_c5", outputCol="categoryIndex")
vec_iris_df = indexer.fit(vec_iris_df).transform(vec_iris_df)

vec_iris_df.sample(False, .05).show()

vec_iris_df = vec_iris_df.withColumn("label", vec_iris_df["categoryIndex"])

train_iris_df, test_iris_df = vec_iris_df.randomSplit([0.9, 0.1], seed=12345)

In [ ]:
lr = LogisticRegression(maxIter=10, regParam=0.001)

# Fit the model
lrModel = lr.fit(train_iris_df)

#predict
predictions = lrModel.transform(test_iris_df)

In [ ]:
from pyspark.mllib.evaluation import MulticlassMetrics

predictionAndLabels = predictions.rdd.map(lambda x: (x.prediction, x.label))

metrics = MulticlassMetrics(predictionAndLabels)

print("accuracy: {}".format(metrics.accuracy))
print("recall: {}".format(metrics.recall()))
print("precision: {}".format(metrics.precision()))
print("f1 measure: {}".format(metrics.fMeasure()))

Porductionalization

When dealing with productionalization, we can divide the way our model will be behaving into 4 flavours:

The main tow things to take into account are:

  • How prediction will be running: ranging from more static to more dynamic, we can differentiate between running a model over a static set or letting our users to query the model as they need to.

  • How learning will be running: ranging from more static to more dynamic, we can differentiate between running the learning process once or re-learning as soon as we acquire new labeled samples.